package com.cantor.example;

import cn.hutool.core.lang.Console;
import cn.hutool.core.util.NumberUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.RandomUtil;
import com.cantor.consumer.discovery.ServiceDiscoverer;
import com.cantor.consumer.pojo.ServiceRef;
import com.cantor.core.center.RegistrationCenter;
import com.cantor.core.center.impl.ZooKeeperRegistrationCenter;
import com.cantor.core.config.CantorAppConfig;
import com.cantor.core.config.CantorAppConfigProperties;
import com.cantor.example.service.HelloService;
import org.apache.zookeeper.server.metric.AvgMinMaxCounter;

import java.util.concurrent.*;
import java.util.stream.IntStream;

/**
 * 测试Consumer消费端
 * 1. 测试单调用耗时
 * 2. 测试并发1(温和)
 * 3. 测试并发2(适中)
 * 4. 检验负载均衡(启动三个服务端,权重为5,3,1)
 * 5. 测试服务熔断(不配置熔断则失败后停止执行,配置后则失败后继续执行但返回null)
 */
public class TestConsumer {
    public static void main(String[] args) throws Exception {

        // 1. 准备CantorAppConfig
        CantorAppConfig appConfig = new CantorAppConfig(new CantorAppConfigProperties());

        // 2. 准备注册中心
        RegistrationCenter center = new ZooKeeperRegistrationCenter()
                .setAddress("127.0.0.1:2181");

        // 3. 准备服务发现者
        ServiceDiscoverer discoverer = ServiceDiscoverer.DEFAULT_DISCOVERER;

        // 4. 使用ServiceRef获取具体代理实现
        HelloService helloService = new ServiceRef<HelloService>()
                .setAppConfig(appConfig) // 设置APP配置
                .setCenter(center) // 设置注册中心
                .setDiscoverer(discoverer) // 设置服务发现者
                .setService(HelloService.class) // 要代理哪个接口?
                .setRetries(0)
                .setMock("fail:return null")
                .get(); // 获得最终实现代理类

        // 5. 调用方法
        // System.err.println(helloService.sayHello("Cantor"));

        while (true) {
            Console.log("+-------------------------------------+:");
            Console.log("| 请输入压力测试类型:");
            Console.log("| 1.单次调用");
            Console.log("| 2.压力测试(easy,100)");
            Console.log("| 3.压力测试(medium,100+=100)");
            Console.log("| 4.服务熔断验证");
            Console.log("| 5.负载均衡验证");
            Console.log("| 6.海啸压测(hard,1000+=1000)");
            Console.log("+-------------------------------------+:\n");
            String input = Console.input();
            switch (input) {
                case "1":
                    singleCallTest(helloService);
                    break;
                case "2":
                    concurrentTest(helloService);
                    break;
                case "3":
                    concurrentTest2(helloService);
                    break;
                case "4":
                    degradationTest(helloService);
                    break;
                case "5":
                    playLoadBalance(helloService);
                    break;
                case "6":
                    concurrentTest3(helloService);
                default:
                    Console.log("请输入正确序号.");
                    continue;
            }
        }


        ////////////////////////////////////////////////////////////////////////////

        // 单调用耗时
        // singleCallTest(helloService);

        ////////////////////////////////////////////////////////////////////////////

        // 并发测试1（温和） 100
        // concurrentTest(helloService);

        // 并发测试2（适中） 100+=100
        // concurrentTest2(helloService);

        // 并发测试3（极端） 1000+=1000
        // concurrentTest3(helloService);

        ////////////////////////////////////////////////////////////////////////////

        // 检测负载均衡
        // playLoadBalance(helloService);

        ////////////////////////////////////////////////////////////////////////////

        // 服务熔断测试(记得设置fail:return null)
        // degradationTest(helloService);

    }

    // 单调用耗时
    public static void singleCallTest(HelloService helloService) {
        long startTime = System.currentTimeMillis();

        String res = helloService.sayHello("高性能RPC");

        Console.error("结果:{} , 耗时: {}ms", res, System.currentTimeMillis() - startTime);
    }

    // 传入HelloService服务, 100个线程并发调用.
    public static void concurrentTest(HelloService helloService) throws InterruptedException {
        final int N_THREADS = 100; // 线程数
        CyclicBarrier barrier = new CyclicBarrier(N_THREADS); // 珊栏,让并发更极端
        CountDownLatch latch = new CountDownLatch(N_THREADS); // 计数器,所有线程跑完后输出负载情况
        final long startTime = System.currentTimeMillis();
        // long[] avgArr = new long[N_THREADS];
        AvgMinMaxCounter avgCounter = new AvgMinMaxCounter("singleAvg");

        // IntStream.range(0, N_THREADS).forEach(i -> {
        //     new Thread(() -> {
        //         try {
        //             barrier.await(); // 在珊栏前等待所有线程一起跑
        //         } catch (InterruptedException | BrokenBarrierException e) {
        //             e.printStackTrace();
        //         }
        //         long singleStart = System.currentTimeMillis();
        //         int a = RandomUtil.randomInt(100);
        //         int b = RandomUtil.randomInt(100);
        //         int r = helloService.cal(a, b);
        //         long singleElapse = System.currentTimeMillis() - singleStart;
        //         avgCounter.add(singleElapse);
        //         Console.log("{} + {} = {}, 耗时:{}ms", a, b, r, (int) singleElapse);
        //         latch.countDown(); // 本次调用完成
        //     }).start();
        // });
        // // 等待调用全部完成
        // latch.await();

        // 同步版
        for (int i = 0; i < N_THREADS; i++) {
            int a = RandomUtil.randomInt(100);
            int b = RandomUtil.randomInt(100);
            long singleStart = System.currentTimeMillis();
            int r = helloService.cal(a, b);
            long singleElapse = System.currentTimeMillis() - singleStart;
            avgCounter.add(singleElapse);
            Console.log("{} + {} = {}, 耗时:{}ms", a, b, r, singleElapse);
        }

        // 输出耗时
        Console.error("并发调用{}次完成, 耗时: {}ms, 平均耗时: {}ms",
                N_THREADS,
                System.currentTimeMillis() - startTime,
                avgCounter.getAvg()
        );
    }

    // 发送十波请求，第一次一百条，后续每次递增一百条，总共5500条, 查看耗时情况
    private static void concurrentTest2(HelloService helloService) throws InterruptedException {
        final int N_WAVES = 10;
        int nReqCount = 100;
        final int INCREMENT = 100;
        int globalCount = 0;
        AvgMinMaxCounter avgCounter = new AvgMinMaxCounter("singleAvg");

        for (int i = 0; i < N_WAVES; i++) {
            long startTime = System.currentTimeMillis();
            for (int j = 0; j < nReqCount; j++) {
                long singleStart = System.currentTimeMillis();
                int r = helloService.cal(RandomUtil.randomInt(100), RandomUtil.randomInt(100));
                long singleElapse = System.currentTimeMillis() - singleStart;
                avgCounter.add(singleElapse);
                // Console.log("result："+r);
            }
            Console.log("第{}波{}次RPC, 耗时: {}ms, 平均耗时: {}ms",
                    (i + 1),
                    nReqCount,
                    System.currentTimeMillis() - startTime,
                    avgCounter.getAvg()
            );
            globalCount += nReqCount;
            nReqCount += INCREMENT;
            avgCounter.reset();
        }
        Console.error("总共{}次RPC调用结束...", globalCount);
    }

    // 超级海啸压测, 发送十波请求，第一次一千条，后续每次递增一千条，总共5.5w条, 查看耗时情况
    private static void concurrentTest3(HelloService helloService) throws InterruptedException {
        final int N_WAVES = 10;
        int nReqCount = 1000;
        final int INCREMENT = 1000;
        int globalCount = 0;
        AvgMinMaxCounter avgCounter = new AvgMinMaxCounter("singleAvg");

        for (int i = 0; i < N_WAVES; i++) {
            long startTime = System.currentTimeMillis();
            for (int j = 0; j < nReqCount; j++) {
                long singleStart = System.currentTimeMillis();
                int r = helloService.cal(RandomUtil.randomInt(100), RandomUtil.randomInt(100));
                long singleElapse = System.currentTimeMillis() - singleStart;
                avgCounter.add(singleElapse);
                // Console.log("result："+r);
            }
            Console.log("第{}波{}次RPC, 耗时: {}ms, 平均耗时: {}ms",
                    (i + 1),
                    nReqCount,
                    System.currentTimeMillis() - startTime,
                    avgCounter.getAvg()
            );
            globalCount += nReqCount; // 累加全局调用次数
            nReqCount += INCREMENT;
        }

        if (globalCount < 10000) {
            Console.error("总共{}次RPC调用结束...", globalCount);
        } else {
            Console.error("总共{}w次RPC调用结束...", NumberUtil.round(globalCount * 1.0f / 10000, 1));
        }
    }

    // 服务熔断测试, 例如当设置了mock="fail:return null", 意为当第一次失败后, 后续全部失败
    // 请先在Provider端注册HelloService的代码处设置.setMock("fail:return null"), 或者在消费端.setMock(...)也可以
    public static void degradationTest(HelloService helloService) throws Exception {
        for (int i = 0; i < 30; i++) {
            long singleStart = System.currentTimeMillis();
            int r = helloService.fusing(5, i + 1);
            long singleElapse = System.currentTimeMillis() - singleStart;
            Console.log("第{}次调用结果:{}, 耗时{}ms", i + 1, r, singleElapse);
        }
    }

    // 发送100条请求,负载均衡, 看服务端各个节点被命中次数
    private static void playLoadBalance(HelloService helloService) throws InterruptedException {
        final int N_THREADS = 200; // 线程数
        CyclicBarrier barrier = new CyclicBarrier(N_THREADS); // 珊栏,让并发更极端
        CountDownLatch latch = new CountDownLatch(N_THREADS); // 计数器,所有线程跑完后输出负载情况
        final long startTime = System.currentTimeMillis();

        // IntStream.range(0, N_THREADS).forEach(i -> {
        //     new Thread(() -> {
        //         try {
        //             barrier.await(); // 在珊栏前等待所有线程一起跑
        //         } catch (InterruptedException | BrokenBarrierException e) {
        //             e.printStackTrace();
        //         }
        //         helloService.loadBalanceCall();
        //         latch.countDown(); // 本次调用完成
        //     }).start();
        // });
        // // 等待调用全部完成
        // latch.await();

        // 同步版
        for (int i = 0; i < N_THREADS; i++) {
            helloService.loadBalanceCall();
        }

        // 输出耗时
        Console.error("{}个请求全部完成,耗时: {}ms", N_THREADS, System.currentTimeMillis() - startTime);
    }

}
